-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[external-assets] Allow asset jobs to combine materializations and observations #19667
Conversation
1098e02
to
9a78a94
Compare
] == {AssetKey("upstream_asset")} | ||
assert defs.get_asset_graph().asset_dep_graph["upstream"][AssetKey("downstream_asset")] == { | ||
AssetKey("upstream_asset") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner to test dependencies in the definitions asset graph than the global asset job, which is an implementation detail.
c30c846
to
e01be6f
Compare
62f4e96
to
9d22a35
Compare
d881aa6
to
a1490ca
Compare
be3a82e
to
33f844e
Compare
58c9a32
to
0e3c817
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-gowd7vpd4-elementl.vercel.app Direct link to changed pages: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some intermediate comments while I process this more. As I stated in slack, I consider this diff extremely risky as written and I implore you to consider approaches that break this up into smaller, more incremental, lower risk changes. I can grab some time with you later.
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"], | ||
source_assets: Sequence["SourceAsset"], | ||
assets_to_execute_by_node_handle: Mapping[ | ||
NodeHandle, Union["AssetsDefinition", "SourceAsset"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a Union now? I thought the point was to consolidate internal code paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per our discussion, I think it would be useful to add a temporary TypeAlias for these Unions so there is a place in the code base to document what we are doing. You can explain how this is a temporary measure. Looking at this without context it makes very little sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of instanceof
s embedded within this function is terrifying. It is extremely difficult to review with any confidence. We need a more coherent model to understand when it is AssetsDefinition
and when it is SourceAsset
if isinstance(asset, AssetsDefinition): | ||
resolved_source_assets += asset.to_source_assets() | ||
resolved_other_assets += asset.to_source_assets() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow I never saw to_source_assets
before. My brain is melting
assets: Sequence[AssetsDefinition], | ||
source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, | ||
assets_to_execute: Sequence[Union[AssetsDefinition, SourceAsset]], | ||
other_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment might have gotten lost so if my be repetive.
other_assets
is not a great name. What it means, as far as I understand, is that it is an additional set of assets that is available to source i/o manager information from. I think we should name it something more specific accordingly.
|
||
assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) | ||
|
||
job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset) | ||
job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need to provide the external asset when getting the job that uses it purely for loading.
def observe( | ||
source_assets: Sequence[SourceAsset], | ||
assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now a union because externals can be observed too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per offline discussion, @smackesey is going to do a series of pre-refactors to build_assets_job
and from_graph_and_assets_node_mapping
to make incremental changes to those functions more palatable.
b7a9ffc
to
d5de819
Compare
a178fec
to
8a81206
Compare
d5de819
to
83293c9
Compare
8a81206
to
35bdc6a
Compare
83293c9
to
6596798
Compare
35bdc6a
to
ec160de
Compare
620d0c2
to
c1ee471
Compare
c7142ad
to
94186ae
Compare
86ebf75
to
06e3dba
Compare
4a3c5b2
to
46c50a5
Compare
46c50a5
to
5deee20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very exciting!
…servations (#19667) ## Summary & Motivation Lift the restriction that asset jobs cannot contain both observations and materializations. This does _not_ change existing user-constructed jobs using public `define_asset_job`-- it just lifts the restriction so that it is now _possible_ to construct jobs that contain both observations and materializations. This will facilitate ongoing asset job refactoring. I don't think we should encourage/advertise this functionality yet because the result of an observation still cannot be used to determine whether to skip downstream steps within a run. ## How I Tested These Changes Updated "mixed asset job" test which previously checked for an error, now checks that it successfully executes with correct results.
Summary & Motivation
Lift the restriction that asset jobs cannot contain both observations and materializations. This does not change existing user-constructed jobs using public
define_asset_job
-- it just lifts the restriction so that it is now possible to construct jobs that contain both observations and materializations. This will facilitate ongoing asset job refactoring. I don't think we should encourage/advertise this functionality yet because the result of an observation still cannot be used to determine whether to skip downstream steps within a run.How I Tested These Changes
Updated "mixed asset job" test which previously checked for an error, now checks that it successfully executes with correct results.